Skip to content

feat(connectors): add AsyncDBConnectorProtocol; implement on PostgreSQL and SQLite#145

Open
kurodo3[bot] wants to merge 19 commits into
mainfrom
eywalker/plt-1453-add-async-methods-to-dbconnectorprotocol-implement-across
Open

feat(connectors): add AsyncDBConnectorProtocol; implement on PostgreSQL and SQLite#145
kurodo3[bot] wants to merge 19 commits into
mainfrom
eywalker/plt-1453-add-async-methods-to-dbconnectorprotocol-implement-across

Conversation

@kurodo3

@kurodo3 kurodo3 Bot commented May 26, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Adds AsyncDBConnectorProtocol — a standalone @runtime_checkable Protocol with 7 async methods (__aenter__, __aexit__, async_close, async_get_table_names, async_get_pk_columns, async_get_column_info, async_iter_batches)
  • PostgreSQLConnector: native async via psycopg3 AsyncConnection / AsyncServerCursor; dedicated connection per async_iter_batches call for portal isolation
  • SQLiteConnector: asyncio.to_thread() wrappers over existing sync methods (thread-safe: check_same_thread=False + threading.RLock already in place)
  • SpiralDBConnector deferred to PLT-1456 (lacks thread-safety prerequisite)
  • Unit tests for both connectors; async integration tests for PostgreSQL

Test plan

  • uv run pytest tests/test_databases/test_postgresql_connector.py -v — PASS
  • uv run pytest tests/test_databases/test_sqlite_connector.py -v — PASS
  • uv run pytest tests/test_databases/test_postgresql_connector_integration.py -m postgres -v — PASS (CI)

Closes PLT-1453

@codecov

codecov Bot commented May 26, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 85.93750% with 18 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/databases/postgresql_connector.py 87.35% 11 Missing ⚠️
...c/orcapod/protocols/async_db_connector_protocol.py 66.66% 7 Missing ⚠️

📢 Thoughts on this report? Let us know!

Comment thread pytest.ini
python_classes = Test*
python_functions = test_*
addopts = -v
asyncio_default_fixture_loop_scope = function

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this do?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`asyncio_default_fixture_loop_scope = function` tells pytest-asyncio to create a fresh event loop for each test function. Without it, pytest-asyncio ≥ 0.23 emits a `DeprecationWarning` and will error in a future release. The `function` scope is the strictest setting — it prevents any async state from leaking between tests.

Comment thread src/orcapod/databases/__init__.py Outdated
from .spiraldb_connector import SpiralDBConnector
from .sqlite_connector import SQLiteConnector
from .postgresql_connector import PostgreSQLConnector
from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be exported as part of this package -- rather have the protocol sub package just export this protocol

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. AsyncDBConnectorProtocol is no longer exported from orcapod.databases — it lives exclusively in orcapod.protocols. Also added DBConnectorProtocol to protocols/__init__ so both connector protocols are exported from a single place (commit 8a1fdae).

Returns:
Dict mapping column name to Arrow DataType.
"""
if re.search(r"\bJOIN\b", query, re.IGNORECASE):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is much shared logic between sync and async version and there is NOT much async used actually, then extract and consolidate

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted the shared parts (commit 8a1fdae):

  • SQL strings_SQL_TABLE_NAMES, _SQL_PK_COLUMNS, _SQL_COLUMN_INFO are now module-level constants. Sync and async schema methods each reference the same string; no duplication.
  • Query-parsing logic — the FROM-clause parsing block that was copy-pasted verbatim between _resolve_column_type_lookup and _async_resolve_column_type_lookup is now a standalone _parse_table_from_query(query) -> str | None helper. Each lookup function is now 3 lines: call the helper, early-return {} on None, then make the single connector call (sync vs async, using their respective connections).

Sync and async connections are kept separate — the async methods continue to use _async_conn (psycopg3 AsyncConnection) rather than delegating to asyncio.to_thread(sync_method).

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an async connector interface for relational DB connectors and implements it for the existing PostgreSQL and SQLite connectors, enabling native use in asyncio-based pipelines without requiring callers to manage thread executors themselves.

Changes:

  • Added a standalone AsyncDBConnectorProtocol and exported it via orcapod.protocols and orcapod.databases.
  • Implemented async lifecycle, async schema introspection, and async batch iteration for PostgreSQLConnector (native psycopg3 async) and SQLiteConnector (wrapping sync methods via asyncio.to_thread()).
  • Added unit tests for async methods (SQLite + PostgreSQL) and PostgreSQL async integration tests; updated pytest asyncio loop-scope configuration.

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/test_databases/test_sqlite_connector.py Adds async-method unit tests against an in-memory SQLite database.
tests/test_databases/test_postgresql_connector.py Adds unit tests for PostgreSQL async lifecycle, async schema introspection, and async batch iteration (mocked psycopg).
tests/test_databases/test_postgresql_connector_integration.py Adds PostgreSQL async integration tests for lifecycle, schema introspection, and async batch iteration.
superpowers/specs/2026-05-23-async-db-connector-protocol-design.md Adds an approved design spec describing the async protocol and connector behavior.
superpowers/plans/2026-05-23-async-db-connector-protocol.md Adds an implementation plan documenting approach and testing steps.
src/orcapod/protocols/async_db_connector_protocol.py Introduces AsyncDBConnectorProtocol definition.
src/orcapod/protocols/init.py Re-exports AsyncDBConnectorProtocol from orcapod.protocols.
src/orcapod/databases/sqlite_connector.py Implements async methods using asyncio.to_thread() wrappers over sync behavior.
src/orcapod/databases/postgresql_connector.py Implements async lifecycle, async schema introspection, and async_iter_batches using psycopg3 async APIs.
src/orcapod/databases/init.py Re-exports AsyncDBConnectorProtocol from orcapod.databases.
pytest.ini Sets asyncio_default_fixture_loop_scope = function for pytest-asyncio behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +556 to +565
async def async_close(self) -> None:
"""Close async and sync connections.

Implementations must be idempotent — calling this multiple times must
not raise.
"""
if self._async_conn is not None:
await self._async_conn.close()
self._async_conn = None
self.close()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Changed self.close() to await asyncio.to_thread(self.close) so the sync shutdown path (lock acquisition + psycopg3 network disconnect) runs in a worker thread instead of on the event loop. Also added import asyncio to the module imports. Fixed in commit 6734c26.

Comment on lines +664 to +668
self._require_async_open() # guard: raise if context manager not entered

with self._lock:
self._require_open() # guard: raise if sync connector is closed
dsn = self._dsn

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Removed the with self._lock: wrapper. _dsn is assigned once in __init__ and never reassigned, so it's safe to read from any thread without a lock. _require_open() is a simple if self._conn is None null-check — even without the lock the worst-case race is an already-closed connector slipping past the guard, which is a programmer error regardless. Both reads are now lock-free with a comment noting _dsn's immutability. Fixed in commit 6734c26.

@kurodo3

kurodo3 Bot commented May 26, 2026

Copy link
Copy Markdown
Contributor Author

Review response — commit 6734c26

Fix 1: async_close() — sync shutdown offloaded to worker thread

async_close() was calling self.close() directly on the event-loop thread. close() acquires threading.RLock and calls psycopg.Connection.close(), which sends a network disconnect to PostgreSQL — both are blocking operations.

Before:

self.close()

After:

await asyncio.to_thread(self.close)

import asyncio was also added to the module-level imports.


Fix 2: async_iter_batches() — lock removed from event-loop-thread reads

The with self._lock: block guarding _require_open() and dsn = self._dsn could stall the event loop if a sync thread held the lock during a long write.

  • _dsn is set exactly once in __init__ and never reassigned — no lock is needed for reads.
  • _require_open() is a if self._conn is None null-check — it doesn't mutate state and a potential race here is already a programmer-error scenario (concurrent close + read).

Before:

with self._lock:
    self._require_open()
    dsn = self._dsn

After:

self._require_open()
dsn = self._dsn  # immutable after __init__; no lock needed

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.

Comment on lines +5 to +11
from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol

__all__ = [
"AsyncDBConnectorProtocol",
"DataExecutionLoggerProtocol",
"ExecutionObserverProtocol",
]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. DBConnectorProtocol is now imported and exported from orcapod.protocols.__init__ alongside AsyncDBConnectorProtocol (commit 8a1fdae).

Comment on lines +693 to +699
await cur.close()
try:
await read_conn.rollback()
except Exception:
pass
await read_conn.close()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved. Each cleanup step (cur.close, read_conn.rollback, read_conn.close) is now wrapped in its own try/except Exception block so one failure cannot skip the others (commit 8a1fdae). A comment notes the remaining CancelledError limitation: full asyncio.shield() protection is deferred as out-of-scope for this PR.

@kurodo3

kurodo3 Bot commented May 27, 2026

Copy link
Copy Markdown
Contributor Author

Review response — commit 8a1fdae

Fix 1: pytest.ini — replied with explanation

asyncio_default_fixture_loop_scope = function is required by pytest-asyncio ≥ 0.23. No code change.


Fix 2: AsyncDBConnectorProtocol removed from orcapod.databases

The protocol is now exported exclusively from orcapod.protocols. Also added DBConnectorProtocol to protocols/__init__ so both connector protocols have a single, consistent home.

# orcapod/protocols/__init__.py
from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol
from orcapod.protocols.db_connector_protocol import DBConnectorProtocol
__all__ = ["AsyncDBConnectorProtocol", "DBConnectorProtocol", ...]

Fix 3: Consolidation of shared SQL / query-parsing logic

  • SQL strings_SQL_TABLE_NAMES, _SQL_PK_COLUMNS, _SQL_COLUMN_INFO are module-level constants; sync and async schema methods each reference the same string.
  • Query-parsing helper — the FROM-clause parsing block duplicated between _resolve_column_type_lookup and _async_resolve_column_type_lookup is now a standalone _parse_table_from_query(query) -> str | None. Each lookup function is 3 lines: parse → return {} on None → single DB call.
  • Sync and async connections stay separate (_conn / _async_conn).

Fix 4: async_iter_batches cleanup robustness

Each cleanup step (cur.close, read_conn.rollback, read_conn.close) is now wrapped in its own try/except Exception block. A comment notes that full asyncio.shield() protection against CancelledError is deferred.


All 134 unit tests pass (uv run pytest tests/test_databases/ -v).

kurodo3 Bot and others added 19 commits June 5, 2026 23:48
Replace NotImplementedError stubs for async_get_table_names, async_get_pk_columns,
and async_get_column_info with real psycopg3 async implementations; add 7 unit tests.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
async_close() closes both async and sync connections, so sync schema
methods called after 'async with connector:' exits would raise.  Move
the three get_table_names/get_pk_columns/get_column_info sync calls to
before the async context in the integration test suite.

Also update the Protocol signature snippet in the spec and plan docs:
async_iter_batches is declared as 'def' (not 'async def') in the Protocol
body so that async-generator concrete implementations satisfy it
statically.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…e first yield

The SQLite async_iter_batches section said batches are "yielded lazily"
but the implementation collects list(self.iter_batches(...)) in a worker
thread before yielding any batch. Update wording to reflect that the full
result set is materialised before the first yield and that batch_size does
not enable true streaming.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ter_batches

async_close() was calling self.close() directly, which acquires a
threading.RLock and performs blocking psycopg3 network I/O on the event
loop thread. Fixed by using await asyncio.to_thread(self.close) so the
sync shutdown runs in a worker thread.

async_iter_batches() was wrapping the _dsn read and _require_open() guard
in `with self._lock:`, which could stall the event loop if a sync thread
held the lock. _dsn is immutable after __init__ and _require_open() is a
simple null-check, so neither needs lock protection from async context.
Removed the lock and annotated _dsn read accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…, fix exports

- Extract _SQL_TABLE_NAMES, _SQL_PK_COLUMNS, _SQL_COLUMN_INFO module-level
  constants; sync and async schema methods now share a single copy of each
  SQL string (no duplication)
- Extract _parse_table_from_query() helper from the two _resolve_column_type_lookup
  functions, which were identical except for the final DB call; each lookup
  function is now ~3 lines
- Remove AsyncDBConnectorProtocol from orcapod.databases — protocols belong in
  orcapod.protocols (eywalker review)
- Add DBConnectorProtocol to orcapod.protocols.__init__ alongside
  AsyncDBConnectorProtocol for a consistent export surface (Copilot review)
- Wrap each async_iter_batches cleanup step (cur.close, rollback, read_conn.close)
  in its own try/except so one failure does not prevent the others from running

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@kurodo3 kurodo3 Bot force-pushed the eywalker/plt-1453-add-async-methods-to-dbconnectorprotocol-implement-across branch from 8a1fdae to 9d3ddbc Compare June 5, 2026 23:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants